package com.ruyuan.little.project.rocketmq.api.coupon.listener;

import com.alibaba.fastjson.JSON;
import com.ruyuan.little.project.common.dto.CommonResponse;
import com.ruyuan.little.project.common.enums.ErrorCodeEnum;
import com.ruyuan.little.project.common.enums.LittleProjectTypeEnum;
import com.ruyuan.little.project.redis.api.RedisApi;
import com.ruyuan.little.project.rocketmq.api.coupon.dto.OrderFinishedMessageDTO;
import com.ruyuan.little.project.rocketmq.api.coupon.service.CouponService;
import com.ruyuan.little.project.rocketmq.api.order.dto.OrderInfoDTO;
import com.ruyuan.little.project.rocketmq.common.constants.RedisKeyConstant;
import org.apache.dubbo.config.annotation.Reference;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;

/**
 * 处理退房订单的优惠券发放业务
 * P.S. 幂等
 * @author 强军
 */
@Component
public class OrderFinishedMessageListener implements MessageListenerConcurrently {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderFinishedMessageListener.class);

    /**
     * redis dubbo服务
     */
    @Reference(version = "1.0.0",
            interfaceClass = RedisApi.class,
            cluster = "failfast", check = false)
    private RedisApi redisApi;
    @Resource
    private CouponService couponService;
    @Value("${order.finished.couponId}")
    private Integer orderFinishedCouponId;
    @Value("${order.finished.coupon.day}")
    private Integer orderFinishedCouponDay;


    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                    ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        String orderNo = null;
        String phoneNumber = null;
        for (MessageExt msg : msgs) {
            String body = new String(msg.getBody(), StandardCharsets.UTF_8);
            LOGGER.info("received order finished message:{}", body);
            OrderFinishedMessageDTO orderFinishedMessageDTO = JSON.parseObject(body, OrderFinishedMessageDTO.class);

            orderNo = orderFinishedMessageDTO.getOrderNo();
            phoneNumber = orderFinishedMessageDTO.getPhoneNumber();

            CommonResponse<Boolean> setnxResponse = redisApi.setnx(RedisKeyConstant.ORDER_FINISHED_DUPLICATION_KEY_PREFIX + orderNo, orderNo, phoneNumber,
                    LittleProjectTypeEnum.ROCKETMQ);
            if (Objects.equals(setnxResponse.getCode(), ErrorCodeEnum.SUCCESS.getCode())
                    &&
                    Objects.equals(setnxResponse.getData(), Boolean.FALSE)) {
                LOGGER.info("duplicate consumer order finished message orderNo:{}", orderNo);
            } else {
                couponService.distributeCoupon(orderFinishedMessageDTO.getBeid(), orderFinishedMessageDTO.getUserId(),
                        orderFinishedCouponId, orderFinishedCouponDay, orderFinishedMessageDTO.getId(), phoneNumber);
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}
